package com.uptown.config;

import com.uptown.mq.KafkaConsumerRunnable;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.uptown.util.RedisUtils;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @Author lixiaofei
 * @create 2020/11/2 8:32 下午
 */
@Data
@Component
public class KafkaConsumerConfig {

    // 线程池维护线程的最少数量
    @Value(value = "${kafka.core-pool-size}")
    private int corePoolSize;

    // 线程池维护线程的最大数量
    @Value(value = "${kafka.max-pool-size}")
    private int maxPoolSize;

    // 线程池维护线程所允许的空闲时间
    @Value(value = "${kafka.keep-alive-time}")
    private int keepAliveTime;

    // 线程池所使用的缓冲队列大小
    @Value(value = "${kafka.work-queue-size}")
    private int workQueueSize;


    @Value(value = "${kafka.redis.prefix}")
    private String redisPreFix;

    @Value(value = "${kafka.redis.service-list}")
    private String serviceList;

    // 统一存放kafka客户端的map
    @Bean(name = "globalKafkaConsumerThreadMap")
    public Map<String, KafkaConsumerRunnable> globalKafkaConsumerThreadMap() {
        return new HashMap<>();
    }

    /**
     * kafka监听任务 线程池
     * 1、当线程数小于核心线程数 创建线程
     * 2、当线程数大于等于核心线程数 且任务队未满 假如任务队列
     * 3、当线程数大于等于核心线程数 且任务队列满 抛异常
     */
    @Bean(name = "defaultThreadPool")
    ThreadPoolExecutor defaultThreadPool() {
        MyExceptionHandler exceptionHandler = new MyExceptionHandler();
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("kafka-consumer-%d")
                .setUncaughtExceptionHandler(exceptionHandler).build();
        CustomThreadPoolExecutor threadPool = new CustomThreadPoolExecutor(
                corePoolSize,                                 //核心线程池大小
                maxPoolSize,                                  //最大核心线程数
                keepAliveTime,                                //超时了没有人调用就会释放
                TimeUnit.SECONDS,                               //超时单位
                new LinkedBlockingDeque<>(maxPoolSize),          //阻塞任务队列
                threadFactory,               //线程工厂
                new ThreadPoolExecutor.AbortPolicy()       //线程达到MAX_POOL_SIZE最大核心线程数且阻塞队列满，丢弃任务抛出RejectedExecutionException异常
        );
        return threadPool;
    }

    /**
     * 自定义异常获取，线程池中的线程挂掉后捕获异常
     */
    public static class MyExceptionHandler implements Thread.UncaughtExceptionHandler {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            System.out.println(String.format("kafka thread name %s, msg %s", t.getName(), e.getStackTrace()));
        }
    }


    public List<String> getRedisServiceList() {
        return RedisUtils.lGet(serviceList, 0, -1)
                .stream().map(th -> (String) th).collect(Collectors.toList());
    }
}

